// Copyright (C) 2020 THL A29 Limited, a Tencent company. All rights reserved.
//
// Licensed under the BSD 3-Clause License (the "License"); you may not use this
// file except in compliance with the License. You may obtain a copy of the
// License at
//
// https://opensource.org/licenses/BSD-3-Clause
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.

#include "flare/base/internal/dpc.h"

#include <chrono>
#include <limits>
#include <mutex>
#include <utility>
#include <vector>

#include "flare/base/align.h"
#include "flare/base/chrono.h"
#include "flare/base/internal/background_task_host.h"
#include "flare/base/internal/cpu.h"
#include "flare/base/internal/time_keeper.h"
#include "flare/base/object_pool.h"
#include "flare/base/thread/thread_local/ref_counted.h"

using namespace std::literals;

namespace flare {

template <class>
struct PoolTraits;

}  // namespace flare

namespace flare::internal {

namespace {

// SPSC queue.
//
// Shameless copied from
// http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue
class JobQueue : public RefCounted<JobQueue> {
 public:
  JobQueue() {
    auto node = object_pool::Get<Node>().Leak();
    node->next.store(nullptr, std::memory_order_relaxed);
    first_free_ = head_ = tail_ = node;
  }

  ~JobQueue() {
    while (!Acquire()) {
      // Spin wait. Hacky & slow, but works.
    }

    // Any pending DPCs are called immediately. (Or should we move them into
    // global list?)
    while (true) {
      std::vector<Function<void()>> cbs;
      cbs.reserve(4096);
      Pop(&cbs);
      if (cbs.empty()) {
        break;
      }
      for (auto&& e : cbs) {
        e();
      }
    }

    // Free the nodes.
    while (first_free_) {
      object_pool::Put<Node>(std::exchange(first_free_, first_free_->next));
    }

    // Never `Release()`-d, there's no point in doing so.

    // FIXME: Anything still in the queue is silently dropped.
  }

  void Push(Function<void()>&& job) noexcept {
    ScopedDeferred _([&] {
      constexpr auto kFreeInterval = 10ms;

      // Free spare nodes here ensures they're always freed in its allocating
      // scheduling group. Our object pool does not perform well when freed
      // outside, unfortunately.
      auto now = ReadCoarseSteadyClock().time_since_epoch();
      if (last_bookkeeping_.load(std::memory_order_relaxed) + kFreeInterval <
          now) {
        FreeSpareNodes();
        last_bookkeeping_.store(now, std::memory_order_relaxed);

        // Reset periodically, as the thread may be migrated by the system.
        node_id_ = numa::GetCurrentNode();
      }
    });

    auto node = AllocNode();
    node->cb = std::move(job);
    // FIXME: I do feel nervous about the atomic store below. Order between
    // storing to two atomic is NOT guaranteed by memory orders other than
    // `std::memory_order_seq_cst`, so we're NOT safe here.
    //
    // Perhaps we should consider using `atomic_load` (C version) instead?
    //
    // (I checked the disassemble result on ARM64, the code generated by GCC
    // should work, although the correctness is not guaranteed by the standard
    // here.)
    node->next.store(nullptr, std::memory_order_relaxed);
    head_.load(std::memory_order_relaxed)
        ->next.store(node, std::memory_order_release);
    head_.store(node, std::memory_order_relaxed);
  }

  // `cbs` is filled upto its internal capacity so as not to do dynamic memory
  // allocation.
  void Pop(std::vector<Function<void()>>* cbs) noexcept {
    cbs->clear();

    // We don't want to touch `current` too much so as to avoid false-sharing.
    auto current = tail_.load(std::memory_order_relaxed);
    while (auto p = current->next.load(std::memory_order_acquire)) {
      FLARE_CHECK(p->cb);
      cbs->push_back(std::move(p->cb));
      // `current` is NOT freed here, we'll reuse it later in `AllocNode()`.
      current = p;
      if (FLARE_UNLIKELY(cbs->size() == cbs->capacity())) {
        break;
      }
    }
    tail_.store(current, std::memory_order_release);
  }

  // Helps the user not to call `Pop()` concurrently.
  bool Acquire() noexcept {
    return !acquired_.exchange(true, std::memory_order_acquire);
  }

  void Release() noexcept { acquired_.store(false, std::memory_order_release); }

  int GetNode() const noexcept { return node_id_; }

 private:
  struct Node;
  friend struct PoolTraits<Node>;

  Node* AllocNode() noexcept {
    auto ptr =
        FLARE_LIKELY(first_free_ != tail_.load(std::memory_order_acquire))
            ? std::exchange(first_free_, first_free_->next)
            : object_pool::Get<Node>().Leak();
    ptr->alloc_ts.store(ReadCoarseSteadyClock().time_since_epoch(),
                        std::memory_order_relaxed);
    return ptr;
  }

  void FreeSpareNodes() {
    constexpr auto kMaxFree = 128;
    constexpr auto kMaxIdle = 5s;
    auto expires_at = ReadCoarseSteadyClock().time_since_epoch() - kMaxIdle;
    std::size_t freed = 0;

    while (first_free_ != tail_.load(std::memory_order_relaxed) &&
           first_free_->alloc_ts.load(std::memory_order_relaxed) < expires_at &&
           ++freed < kMaxFree) {
      object_pool::Put<Node>(std::exchange(first_free_, first_free_->next));
    }
  }

 private:
  struct Node {
    std::atomic<Node*> next;
    std::atomic<std::chrono::steady_clock::duration> alloc_ts;
    Function<void()> cb;
  };

  // Reset periodically.
  int node_id_{numa::GetCurrentNode()};

  // Last time we freed not-freed-yet nodes.
  std::atomic<std::chrono::steady_clock::duration> last_bookkeeping_{};

  // We don't align them to destructive hardware interference size as we don't
  // touch them concurrently much. Aligning them would increase our memory
  // footprint considerably, impacting performance.
  //
  // FIXME: Using `std::atomic` (relaxed ordering) does not seems quite right
  // here..
  std::atomic<Node*> head_, tail_;
  Node* first_free_;

  // If set, someone is consuming it.
  alignas(hardware_destructive_interference_size) std::atomic<bool> acquired_{
      false};
};

ThreadLocalRefCounted<JobQueue> job_queues;

// Initialize DPC runtime.
class DpcInitializer {
 public:
  DpcInitializer() {
    dpc_timer_ = TimeKeeper::Instance()->AddTimer(
        {}, 10ms, [](auto) { FlushDpcs(); }, true);
  }

  ~DpcInitializer() { TimeKeeper::Instance()->KillTimer(dpc_timer_); }

 private:
  std::uint64_t dpc_timer_;
};

void InitializeDpcOnce() { static DpcInitializer di; }

}  // namespace

// TODO(luobogao): Block caller if the internal DPC queue is piling up.
void QueueDpc(Function<void()>&& cb) noexcept {
  InitializeDpcOnce();
  job_queues->Push(std::move(cb));
}

void FlushDpcs() noexcept {
  job_queues.ForEach([&](JobQueue* queue) {
    // An extra ref. to `queue` is kept by the callback. It's possible that by
    // the time the callback get called, the queue's owner thread has already
    // exited. To avoid use-after-free, we need this extra ref.
    auto flush_cb = [queue = RefPtr(ref_ptr, queue)] {
      if (!queue->Acquire()) {
        return;
      }
      ScopedDeferred _([&] { queue->Release(); });

      thread_local std::vector<Function<void()>> cbs;
      std::size_t dpcs = 0;

      FLARE_CHECK(cbs.empty());  // It's a sane context.
      cbs.reserve(131072);
      while (true) {
        queue->Pop(&cbs);
        if (cbs.empty()) {
          break;
        }
        for (auto&& e : cbs) {
          e();
        }
        dpcs += cbs.size();
      }
      FLARE_VLOG(100, "Run {} DPCs.", cbs.size());
    };
    BackgroundTaskHost::Instance()->Queue(queue->GetNode(), flush_cb);
  });
}

}  // namespace flare::internal

namespace flare {

template <>
struct PoolTraits<internal::JobQueue::Node> {
  static constexpr auto kType = PoolType::MemoryNodeShared;
  static constexpr auto kLowWaterMark = 65536;
  static constexpr auto kHighWaterMark =
      std::numeric_limits<std::size_t>::max();
  static constexpr auto kMaxIdle = std::chrono::seconds(10);
  static constexpr auto kMinimumThreadCacheSize = 1024;
  static constexpr auto kTransferBatchSize = 1024;
};

}  // namespace flare
